Skip to content

Conversation

@akshaychitneni
Copy link
Contributor

What this PR does / why we need it:
Adds dataset initializer to bootstrap cache

Which issue(s) this PR fixes (optional, in Fixes #<issue number>, #<issue number>, ... format, will close the issue(s) when PR gets merged):
Fixes # #2792

Checklist:

  • Docs included if any changes are user facing

@coveralls
Copy link

coveralls commented Aug 20, 2025

Pull Request Test Coverage Report for Build 18470076977

Details

  • 0 of 0 changed or added relevant lines in 0 files are covered.
  • No unchanged relevant lines lost coverage.
  • Overall coverage remained the same at 54.488%

Totals Coverage Status
Change from base Build 18375028578: 0.0%
Covered Lines: 1214
Relevant Lines: 2228

💛 - Coveralls

@akshaychitneni akshaychitneni changed the title KEP-2655: Adding cache initializer feat - KEP-2655: Adding cache initializer Aug 20, 2025
Copy link
Member

@andreyvelich andreyvelich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @akshaychitneni!
I left my initial comments.

@andreyvelich andreyvelich changed the title feat - KEP-2655: Adding cache initializer feat: KEP-2655: Adding cache initializer Aug 26, 2025
@andreyvelich
Copy link
Member

/milestone v2.1

@google-oss-prow google-oss-prow bot added this to the v2.1 milestone Sep 24, 2025
@akshaychitneni akshaychitneni force-pushed the cache_initilizer branch 3 times, most recently from 2392316 to e20b1c1 Compare September 25, 2025 20:59
@akshaychitneni akshaychitneni changed the title feat: KEP-2655: Adding cache initializer feat(cache): KEP-2655: Adding cache initializer Sep 25, 2025
@akshaychitneni akshaychitneni force-pushed the cache_initilizer branch 3 times, most recently from 5c77408 to e3b9fad Compare September 25, 2025 22:17
Copy link
Member

@andreyvelich andreyvelich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @akshaychitneni, I left a few comments.

cc @kubeflow/kubeflow-trainer-team @rudeigerc appreciate your review as well!

@dataclass
class CacheDatasetInitializer:
storage_uri: str
train_job_name: str
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also is not needed, the TrainJob name is equal to JobSet name.
We can set the ENV variable in the ClusterTrainingRuntime: TRAIN_JOB_NAME which is getting the value from
metadata.labels['jobset.sigs.k8s.io/jobset-name']

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I plan to set as env var on initializer container using downward api and here I am accessing that env to use trainjob name for ownerRef.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@akshaychitneni If you get TrainJob name from the env, you don't need to set it in the CacheDatasetInitializer.
Similar to namespace:
https://github.com/kubeflow/trainer/pull/2793/files#diff-ed9e751df204997160579feb800b458887ed801b5caab572c0b5142b2e63129bR52

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But isn't it similar to fetching from env variable? for namespace, I am not using env var

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CacheDatasetInitializer is config that we expose to the end-user. Since TrainJob will be always set via env variable, you don't need to expose this parameter in the config that user can adjust.
Does it make sense @akshaychitneni ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not directly exposed to the users though. we create it from env vars -

def get_config_from_env(config) -> Dict[str, str]:
. And these env vars from be pushed from sdk

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, so you are going to have TRAIN_JOB_NAME env being set in your ClusterTrainingRuntime, correct ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

logging.info(f"Created LeaderWorkerSet {train_job_name}-cache")

# Create Service
service = client.V1Service(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LWS doesn't have any API to create service automatically ?

Maybe @kerthcet @kannon92 @ahg-g @ardaguclu knows about it ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think lws controller doesn't create/manage service objects and it is upto clients to define.

@akshaychitneni akshaychitneni force-pushed the cache_initilizer branch 4 times, most recently from dface27 to efbebae Compare September 30, 2025 23:20
Copy link
Member

@andreyvelich andreyvelich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @akshaychitneni, just left a few comments.
/cc @kubeflow/kubeflow-trainer-team @rudeigerc in case you want to left more comments.

Comment on lines 295 to 297
],
)
def test_default_values(test_name, config_values, expected_defaults):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need this test case since you verify cluster creation here:

def test_create_cache_cluster(test_name, test_case):

Suggested change
],
)
def test_default_values(test_name, config_values, expected_defaults):

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I am validating config and in test_create_cache_cluster, I am looking for k8s api calls

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@akshaychitneni Can you just create another test case in the test_create_cache_cluster function?

@@ -0,0 +1,356 @@
from unittest.mock import MagicMock, patch
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@akshaychitneni Are you going to add integration tests in the future PRs ?

"HuggingFace - Invalid dataset",

Copy link
Contributor Author

@akshaychitneni akshaychitneni Oct 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I will work on adding integration tests in the future PRs

config_dict = utils.get_config_from_env(types.CacheDatasetInitializer)
self.config = types.CacheDatasetInitializer(**config_dict)

def download_dataset(self):
Copy link
Member

@andreyvelich andreyvelich Oct 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@akshaychitneni Did you review this ? Even that DatasetProvider interface doesn't have this API, we can still directly call cache.create_cache_cluster() API here:

cache.download_dataset()

"worker_cpu": "8",
"worker_mem": "16Gi",
},
"expected_substitutions": {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where do you verify expected_substitutions ?

@akshaychitneni akshaychitneni force-pushed the cache_initilizer branch 2 times, most recently from 5fc611b to 27c69ec Compare October 8, 2025 19:47
Copy link
Member

@andreyvelich andreyvelich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


# Get TrainJob for owner reference
try:
training_job = custom_api.get_namespaced_custom_object(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What gives the permissions to the TrainJob initializer to perform those requests to the API server?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Runtime should be configured with initializer having a serviceAccount with relevant permissions. We plan to document it.

schema_name = self.schema_name

# Load Kubernetes configuration
config.load_incluster_config()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not too deep into the design of this, so apologies for the out-of-context comment, but my first reaction is should all this be part of the control plane and not the runtime?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, ideally we should move it to operator, we just didn't get chance to work on this.
@akshaychitneni Maybe as a workaround before building a cache controller, we can use trainer-controller-manager to create LWS with the appropriate spec (e.g. the cache plugin can be activated when storageURI sets as follows: cache://database/table)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that was the initial plan to add as a plugin to trainer. As we intend to make leverage its own operator we haven't pursed that path. I think we can revisit this approach.

Comment on lines +107 to +111
annotations={
"eks.amazonaws.com/sts-regional-endpoints": "true",
"eks.amazonaws.com/role-arn": iam_role,
},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should that be made configurable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Our initial implementation only support s3 via iam. I think it is good make this configurable once we support additional providers

Signed-off-by: Akshay Chitneni <achitneni@apple.com>
Copy link
Member

@andreyvelich andreyvelich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think, we should be good to move this forward.
@akshaychitneni Please create tracking issues for the @astefanutti's suggestions, so we can track them: #2793 (comment)
/lgtm
/approve

@google-oss-prow google-oss-prow bot added the lgtm label Oct 13, 2025
@google-oss-prow
Copy link

[APPROVALNOTIFIER] This PR is APPROVED

This pull-request has been approved by: andreyvelich

The full list of commands accepted by this bot can be found here.

The pull request process is described here

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@google-oss-prow google-oss-prow bot merged commit 1581a07 into kubeflow:master Oct 13, 2025
30 of 31 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants